RabbitMQ系列(九)RabbitMQ进阶

您所在的位置:网站首页 rabbitmq队列长度查询 python RabbitMQ系列(九)RabbitMQ进阶

RabbitMQ系列(九)RabbitMQ进阶

2024-07-16 14:35:49| 来源: 网络整理| 查看: 265

RabbitMQ进阶-Queue队列参数详解

文章目录 RabbitMQ进阶-Queue队列参数详解1.创建队列参数2.参数解析2.1 Message TTL2.2 Auto expire2.3 Max length2.4 Max length bytes2.5 Overflow behaviour2.6 Dead letter exchange2.7 Dead letter routing key2.8 Maximum priority2.9 Lazy mode2.10 Master locator

1.创建队列参数

我们看下队列参数

void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException; queue 队列名称durable 队列是否持久化,false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化exclusive 队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除autoDelete 如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)arguments ,args参数我们可以通过界面看下,一共下面这么多参数 在这里插入图片描述 Message TTL : 消息生存期,可以用作延迟队列,消息延迟消费等场景 Auto expire : 队列生存期,队列多长时间(毫秒)没有被使用(访问)就会被删除.换个说法就是,当队列在指定的时间内没有被使用(访问)就会被删除 Max length : 队列可以容纳的消息的最大条数 Max length bytes : 队列可以容纳的消息的最大字节数 Overflow behaviour : 队列中的消息溢出后如何处理 Dead letter exchange : 死信队列交换机、溢出的消息需要发送到绑定该死信交换机的队列 Dead letter routing key :死信队列RK、 溢出的消息需要发送到绑定该死信交换机,并且路由键匹配的队列 Maximum priority : 最大优先级,数字越大,越优先消费,可以针对实时性强的消息优先消费 Lazy mode : 懒队列,在磁盘上尽可能多地保留消息以减少RAM使用;如果未设置,则队列将保留内存缓存以尽可能快地传递消息 Master locator : 集群相关设置,将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则

下面我们一 一来验证下属性,下面的队列中的RoutingKey 很多我都直接用队列名做RoutingKey了,事先声明下

2.参数解析 2.1 Message TTL

我们新建一个TTL的队列, 设置x-message-ttl:10000,即10s,发布一条消息,看这个消息如果超过生存周期,如何处理 在这里插入图片描述 通过Publish 一条消息 在这里插入图片描述 然后观察下 消息在队列中的生存时间 在这里插入图片描述

2.2 Auto expire

队列多长时间(毫秒)没有被使用(访问)就会被删除.换个说法就是,当队列在指定的时间内没有被使用(访问)就会被删除. 新建一个队列,持久化,然后设置 周期20s,设置x-expires:20000 在这里插入图片描述 队列创建 在这里插入图片描述 过了20s后,队列自动消失,变成只有1个队列了 在这里插入图片描述 我试试如果给队列加个消费者,看看是不是能活久一点

package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import conn.MqConnectUtil; import subscrib3.ExchangeTypeEnum; import java.util.HashMap; import java.util.Map; import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C; import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_C; /** * 当前描述:消费者 * * @author: jiazijie * @since: 2020/6/10 下午11:30 */ public class QueueAutoExpireConsumer { /** * 队列名字 */ private final static String SIMPLE_QUEUE_NAME = "auto_expire_test"; public static void main(String[] argv) throws Exception { Connection connection = null; Channel channel = null; try { connection = MqConnectUtil.getConnectionDefault(); channel = connection.createChannel(); /*声明交换机 String exchange * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType()); /*声明队列 * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ Map arguments = new HashMap(); arguments.put("x-expires",20000); channel.queueDeclare(SIMPLE_QUEUE_NAME, true, false, false, arguments); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(SIMPLE_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C); System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); /* 消息确认机制 * autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费 * autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态 * 并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈 * !!!!!! 注意这里是 false,手动确认 */ channel.basicConsume(SIMPLE_QUEUE_NAME, false, consumer); int count = 0; while (count < 10) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'"); doSomeThing(message); //返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); count++; } } catch (Exception e) { e.printStackTrace(); } finally { channel.close(); connection.close(); } } /** * 模拟处理复杂逻辑:休眠100ms * * @param message * @throws Exception */ public static void doSomeThing(String message) throws Exception { //遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑 Thread.sleep(100); } }

在这里插入图片描述 消费者进程关掉后,队列过20s就消失了

2.3 Max length

队列可以容纳的消息的最大条数,超过这个条数,队列头部的消息将会被丢弃,注意是队列头部的消息被丢弃 现在我们创建队列容纳5个消息 生产者生产5条消息

package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxLengthProducer { /** * 队列名字 */ private final static String MAX_LENGTH_QUEUE_NAME = "max_length_queue"; public static void maxLengthProducer(Integer count) throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length", 5); channel.queueDeclare(MAX_LENGTH_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ for (int i = 0; i < count; i++) { // 消息内容 String message = "i=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", MAX_LENGTH_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); } //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 5条消息 maxLengthProducer(5); //生产 10条消息 // maxLengthProducer(10); } }

5条消息 都在队列中 在这里插入图片描述 看一下是什么消息 在这里插入图片描述 清空队列中的5条消息 Purge Messages

然后再修改代码生产10条消息,可以看到消息清零后,又产生5条,队列中也只有5条 在这里插入图片描述

看下是什么消息,可以看到消息ID为 6,7,8,9,10,已经将前5条消息丢弃掉了,只保留了最后的5条 在这里插入图片描述

2.4 Max length bytes

队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃. 我们新建一个Max length bytes 字节为3 的队列,为什么是3,因为一个汉字UTF-8编码是3个字节,所以我们输入看是否内容会被截断 队列创建

package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxLengthBytesProducer { /** * 队列名字 */ private final static String MAX_LENGTH__BYTES_QUEUE_NAME = "max_length_bytes_queue"; public static void maxLengthBytesProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length-bytes", 3); channel.queueDeclare(MAX_LENGTH__BYTES_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = new String("测试一下".getBytes(), StandardCharsets.UTF_8); channel.basicPublish("", MAX_LENGTH__BYTES_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 5条消息 maxLengthBytesProducer(); } }

然后启动生产者,生产消息,看队列中是否 在这里插入图片描述 观察队列发现,队列中并没有消息,并不是我们想象的,我们发送 “测试一下” 字符串,测字不会截取保留再队列中而是超出字符消息直接被丢弃了 我们修改message,改为一个字,试试

String message = new String("好".getBytes(), StandardCharsets.UTF_8);

在这里插入图片描述 结果证明,超出 max length bytes的消息会被队列直接丢弃,而不是截取

2.5 Overflow behaviour

官方 : Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head or reject-publish. 队列中的消息溢出时,如何处理这些消息.要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息.( 从上面两个参数的测试中可以看出,“drop-head” 应该是默认行为) 我们新建一个队列,看下队列溢出时候,如何处理消息,我们先设置队列长度为2,然后生产3条消息,看下默认处理

package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class OverflowDropProducer { /** * 队列名字 */ private final static String OVER_DROP_QUEUE_NAME = "over_drop_queue"; public static void overflowDropProducer(Integer count) throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length", 2); // arguments.put("x-overflow", "drop-head"); //不设置试试,消息如何处理,看下默认情况 channel.queueDeclare(OVER_DROP_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ for (int i = 0; i < count; i++) { // 消息内容 String message = "i=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", OVER_DROP_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); } //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 10条消息 overflowDropProducer(3); } }

生产3条消息 在这里插入图片描述 查看默认的队列处理结果,只有2条消息再队列中,i=1,i=2,已经把i=0的消息删除掉了,也就是默认删除对列头部的消息 在这里插入图片描述 说明,队列把i=0的消息已经删除了,当新的消息到来的时候,超出了队列的存储限制,原来在队列中的消息就会被默认丢弃掉

因为rabbitmq不允许对已有队列的参数进行修改,我们现在我们删除队列 然后重新运行生产者,设置下参数 arguments.put(“x-overflow”, “drop-head”); 看看是不是一样的效果,可以验证,和默认是一样的 在这里插入图片描述

现在我们设置一下参数 arguments.put(“x-overflow”, “reject-publish”); 看下有什么不一样,依旧删除队列,修改代码

Map arguments = new HashMap(); arguments.put("x-max-length", 2); arguments.put("x-overflow", "reject-publish"); //不设置试试,消息如何处理,看下默认情况 channel.queueDeclare(OVER_DROP_QUEUE_NAME, false, false, false, arguments);

依旧生产3条消息 在这里插入图片描述 查看默认的队列处理结果,只有2条消息再队列中,i=0,i=1,已经把i=2的消息超出了队列限制,直接拒绝接收了,保留了原来的2条消息 在这里插入图片描述

2.6 Dead letter exchange

下一章专门讲 死信队列和死信队列路由

2.7 Dead letter routing key

下一章专门讲 死信队列和死信队列路由

2.8 Maximum priority

设置该队列中的消息的优先级最大值.发布消息的时候,可以指定消息的优先级,优先级高的先被消费.如果没有设置该参数,那么该队列不支持消息优先级功能.即使发布消息的时候传入了优先级的值,也不会起什么作用

设置队列arguments.put(“x-max-priority”, 100); 我们发送6条消息,分别设置标识id为 1, 50, 255,200,255-1,256 看下6条优先级消息的处理情况 分析: 为什么要发6条消息,x-max-priority 最大优先级范围在 0~255之间,所以我们设置了 两个255的消息,id分别为255和255-1,到时候看下这两个消息的优先级,而且我们还设置了一个 256的id,看下这个的优先级,注意我是先放255,再放200,再放255-1的

package queue.params; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxPriorityProducer { /** * 队列名字 */ private final static String MAX_PRIORITY_QUEUE_NAME = "max_priority_queue"; public static void maxPriorityProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-priority", 100); channel.queueDeclare(MAX_PRIORITY_QUEUE_NAME, false, false, false, arguments); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .priority(1) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 * * 设置优先级 为 1的消息 */ String message1 = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop1 = new AMQP.BasicProperties().builder() .priority(1) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop1, message1.getBytes()); System.out.println(" **** Producer Sent Message: [" + message1 + "]"); //设置优先级为 50 的 消息 String message50 = "i=50" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop50 = new AMQP.BasicProperties().builder() .priority(50) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop50, message50.getBytes()); System.out.println(" **** Producer Sent Message: [" + message50 + "]"); //设置优先级为 255 的 消息 String message255 = "i=255" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop255 = new AMQP.BasicProperties().builder() .priority(255) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop255, message255.getBytes()); System.out.println(" **** Producer Sent Message: [" + message255 + "]"); //设置优先级为 200 的 消息 String message200 = "i=200" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop200 = new AMQP.BasicProperties().builder() .priority(200) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop200, message200.getBytes()); System.out.println(" **** Producer Sent Message: [" + message200 + "]"); //设置优先级为 255 的 消息 String message255_1 = "i=255-1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop255_1 = new AMQP.BasicProperties().builder() .priority(255) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop255_1, message255_1.getBytes()); System.out.println(" **** Producer Sent Message: [" + message255_1 + "]"); //设置优先级为 256 的 消息 String message256 = "i=256" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop256 = new AMQP.BasicProperties().builder() .priority(256) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop256, message256.getBytes()); System.out.println(" **** Producer Sent Message: [" + message256 + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 消息 maxPriorityProducer(); } }

生产6条消息 在这里插入图片描述

看下队列中的消息 在这里插入图片描述

我们看下消息,获取消息时候优先消费谁呢?这个200、255、255-1、256的顺序如何定义 分析: 我们发送顺序是 1, 50, 255,200,255-1,256 x-max-priority=100

1. 大于max-priority 小于 256的优先级 超出队列定义100 且小于256的 这一部分 就是 255、200、255-1,他们三条消息是优先级平等的,都按照最大优先级设置、且消费顺序按照先进先出,255先消费、200第二消费、255-1最后消费 2. 大于255的,比如我们设置的256 大于255,队列优先级范围的,同意按照优先级 0来处理,所以256应该是最后消费的,优先级比 1还小 3. 在max-prority范围内的,也就是小于100的这一部分 按照正常优先级顺序,先消费大的,在消费小的 综上所述,我们的消费顺序应该是 255、200、255-1、50、1、256这个顺序,我们来验证一下吧,查看队列消息 在这里插入图片描述

2.9 Lazy mode

懒人模式.该模式下的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用.当消费者开始消费的时候才加载到内存中;如果没有设置懒人模式,队列则会直接利用内存缓存,以最快的速度传递消息.

生产两个队列的消息,看下消息的区别

package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class LazyModeProducer { /** * 队列名字 */ private final static String LAZY_QUEUE_NAME = "lazy_mode_queue"; private final static String NORMAL_QUEUE_NAME = "normal_mode_queue"; public static void lazyModeProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-queue-mode", "lazy"); channel.queueDeclare(LAZY_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", LAZY_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void normalModeProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, null); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = "i=2" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", NORMAL_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产消息 lazyModeProducer(); normalModeProducer(); } }

看下队列中的消息 在这里插入图片描述 看下队列的详细信息,对比下两个队列的区别,懒人模式消息不占用内存,直到有了消费者才开始消费的时候,才会读到内存开始消费 在这里插入图片描述

2.10 Master locator

这个是关于集群配置的,把队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则

下一章 RabbitMQ系列(十)RabbitMQ进阶-Queue队列参数详解-死信交换机



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭